package cc.sunni.rabbitmq.config;

import cc.sunni.rabbitmq.dao.MqMessageDao;
import cc.sunni.rabbitmq.entity.MqMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
@Slf4j
public class RabbitMQConfig {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MqMessageDao mqMessageDao;

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    /**
     * 消息对象序列化器
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate,确保消息不丢失
     * 生产端消ConfirmCallback,ReturnCallback
     * 消费端ACK机制
     */
    @PostConstruct
    public void initRabbitTemplate() {
        // ConfirmCallback消息抵达交换机的回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 交换机(Exchange)收到消息就会回调
             * CorrelationData 当前消息的唯一关联数据
             * ack 消息是否成功送达
             * cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                String messageId = correlationData.getId();
                if (ack) {
                    // 如果confirm返回成功,则更新消息状态
                    MqMessage m = new MqMessage();
                    m.setMessageId(messageId);
                    m.setStatus(Constants.MESSAGE_SUCCESS);
                    mqMessageDao.updateById(m);
                    log.info("ID:[{}]的消息投递成功", correlationData.getId());
                } else {
                    // 失败后定时任务会进行重试
                    log.error("ID:[{}]的消息投递失败,原因:{}", correlationData.getId(), cause);
                }
            }
        });
    }
}
